/*
 * Copyright (c) 2022-2024, NVIDIA CORPORATION.
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

#include "lists/utilities.hpp"

#include <cudf/column/column.hpp>
#include <cudf/column/column_factories.hpp>
#include <cudf/detail/concatenate.hpp>
#include <cudf/detail/copy.hpp>
#include <cudf/detail/sorting.hpp>
#include <cudf/detail/structs/utilities.hpp>
#include <cudf/detail/utilities/linked_column.hpp>
#include <cudf/detail/utilities/vector_factories.hpp>
#include <cudf/lists/lists_column_view.hpp>
#include <cudf/table/experimental/row_operators.cuh>
#include <cudf/table/table_view.hpp>
#include <cudf/utilities/type_checks.hpp>
#include <cudf/utilities/type_dispatcher.hpp>

#include <rmm/mr/device/per_device_resource.hpp>

#include <thrust/iterator/transform_iterator.h>

namespace cudf {
namespace experimental {

namespace {

/**
 * @brief Removes the offsets of struct column's children
 *
 * @param c The column whose children are to be un-sliced
 * @return Children of `c` with offsets removed
 */
std::vector<column_view> unslice_children(column_view const& c)
{
  if (c.type().id() == type_id::STRUCT) {
    auto child_it = thrust::make_transform_iterator(c.child_begin(), [](auto const& child) {
      return column_view(
        child.type(),
        child.offset() + child.size(),  // This is hacky, we don't know the actual unsliced size but
                                        // it is at least offset + size
        child.head(),
        child.null_mask(),
        child.null_count(),
        0,
        unslice_children(child));
    });
    return {child_it, child_it + c.num_children()};
  }
  return {c.child_begin(), c.child_end()};
};

/**
 * @brief Removes the child column offsets of struct columns in a table.
 *
 * Given a table, this replaces any struct columns with similar struct columns that have their
 * offsets removed from their children. Structs that are children of list columns are not affected.
 *
 */
table_view remove_struct_child_offsets(table_view table)
{
  std::vector<column_view> cols;
  cols.reserve(table.num_columns());
  std::transform(table.begin(), table.end(), std::back_inserter(cols), [&](column_view const& c) {
    return column_view(c.type(),
                       c.size(),
                       c.head<uint8_t>(),
                       c.null_mask(),
                       c.null_count(),
                       c.offset(),
                       unslice_children(c));
  });
  return table_view(cols);
}

/**
 * @brief The enum to specify whether the `decompose_structs` function will process lists columns
 * (at any nested level) or will output them unchanged.
 */
enum class decompose_lists_column : bool { YES, NO };

/**
 * @brief Decompose all struct columns in a table
 *
 * If a structs column is a tree with N leaves, then this function decomposes the tree into
 * N "linear trees" (branch factor == 1) and prunes common parents. Also returns a vector of
 * per-column `depth`s.
 *
 * A `depth` value is the number of nested levels as parent of the column in the original,
 * non-decomposed table, which are pruned during decomposition.
 *
 * Special handling is needed in the cases of structs column having lists as its first child. In
 * such situations, the function decomposes the tree of N leaves into N+1 linear trees in which the
 * second tree was generated by extracting out leaf of the first tree. This is to make sure there is
 * no structs column having child lists column in the output. Note that structs with lists children
 * in subsequent positions do not require any special treatment because the struct parent will be
 * pruned for all subsequent children.
 *
 * For example, if the original table has a column `Struct<Struct<int, float>, decimal>`,
 *
 *      S1
 *     / \
 *    S2  d
 *   / \
 *  i   f
 *
 * then after decomposition, we get three columns:
 * `Struct<Struct<int>>`, `float`, and `decimal`.
 *
 *  0   2   1  <- depths
 *  S1
 *  |
 *  S2      d
 *  |
 *  i   f
 *
 * The depth of the first column is 0 because it contains all its parent levels, while the depth
 * of the second column is 2 because two of its parent struct levels were pruned.
 *
 * Similarly, a struct column of type `Struct<int, Struct<float, decimal>>` is decomposed as follows
 *
 *     S1
 *    / \
 *   i   S2
 *      / \
 *     f   d
 *
 *  0   1   2  <- depths
 *  S1  S2  d
 *  |   |
 *  i   f
 *
 * In the case of structs column with a lists column as its first child such as
 * `Struct<List<int>, float>`, after decomposition we get three columns `Struct<>`,
 * `List<int>`, and `float`.
 *
 * When list columns are present, depending on the input flag `decompose_lists`, the decomposition
 * can be performed similarly to pure structs but list parent columns are NOT pruned. The list
 * parents are still needed to define the range of elements in the leaf that belong to the same row.
 *
 * For example, if the original table has a column `List<Struct<int, float>>`,
 *
 *    L
 *    |
 *    S
 *   / \
 *  i   f
 *
 * after decomposition, we get two columns
 *
 *  L   L
 *  |   |
 *  S   f
 *  |
 *  i
 *
 * Note that the `decompose_lists` flag should be specified as follow:
 *  - `decompose_lists_column::YES` when preprocessing a table for equality comparison.
 *  - `decompose_lists_column::NO` when preprocessing a table for lexicographic comparison,
 *    since we need to keep all lists columns intact to input into the next preprocessing step.
 *
 * @param table The table whose struct columns to decompose.
 * @param decompose_lists Whether to decompose lists columns
 * @param column_order The per-column order if using output with lexicographic comparison
 * @param null_precedence The per-column null precedence
 * @return A tuple containing a table with all struct columns decomposed, new corresponding column
 *         orders and null precedences and depths of the linearized branches
 */
auto decompose_structs(table_view table,
                       decompose_lists_column decompose_lists,
                       host_span<order const> column_order         = {},
                       host_span<null_order const> null_precedence = {})
{
  auto linked_columns = detail::table_to_linked_columns(table);

  std::vector<column_view> verticalized_columns;
  std::vector<order> new_column_order;
  std::vector<null_order> new_null_precedence;
  std::vector<int> verticalized_col_depths;
  for (size_t col_idx = 0; col_idx < linked_columns.size(); ++col_idx) {
    detail::linked_column_view const* col = linked_columns[col_idx].get();
    if (is_nested(col->type())) {
      // convert and insert
      std::vector<std::vector<detail::linked_column_view const*>> flattened;
      std::function<void(
        detail::linked_column_view const*, std::vector<detail::linked_column_view const*>*, int)>
        recursive_child = [&](detail::linked_column_view const* c,
                              std::vector<detail::linked_column_view const*>* branch,
                              int depth) {
          branch->push_back(c);
          if (decompose_lists == decompose_lists_column::YES && c->type().id() == type_id::LIST) {
            recursive_child(
              c->children[lists_column_view::child_column_index].get(), branch, depth + 1);
          } else if (c->type().id() == type_id::STRUCT) {
            for (size_t child_idx = 0; child_idx < c->children.size(); ++child_idx) {
              // When child_idx == 0, we also cut off the current branch if its first child is a
              // lists column.
              // In such cases, the last column of the current branch will be `Struct<List,...>` and
              // it will be modified to empty struct type `Struct<>` later on.
              if (child_idx > 0 || c->children[0]->type().id() == type_id::LIST) {
                verticalized_col_depths.push_back(depth + 1);
                branch = &flattened.emplace_back();
              }
              recursive_child(c->children[child_idx].get(), branch, depth + 1);
            }
          }
        };
      auto& branch = flattened.emplace_back();
      verticalized_col_depths.push_back(0);
      recursive_child(col, &branch, 0);

      for (auto const& branch : flattened) {
        column_view temp_col = *branch.back();

        // Change `Struct<List,...>` into empty struct type `Struct<>`.
        if (temp_col.type().id() == type_id::STRUCT &&
            (temp_col.num_children() > 0 && temp_col.child(0).type().id() == type_id::LIST)) {
          temp_col = column_view(temp_col.type(),
                                 temp_col.size(),
                                 temp_col.head(),
                                 temp_col.null_mask(),
                                 temp_col.null_count(),
                                 temp_col.offset(),
                                 {});
        }

        for (auto it = branch.crbegin() + 1; it < branch.crend(); ++it) {
          auto const& prev_col = *(*it);
          auto children =
            (prev_col.type().id() == type_id::LIST)
              ? std::vector<column_view>{*prev_col
                                            .children[lists_column_view::offsets_column_index],
                                         temp_col}
              : std::vector<column_view>{temp_col};
          temp_col = column_view(prev_col.type(),
                                 prev_col.size(),
                                 nullptr,
                                 prev_col.null_mask(),
                                 prev_col.null_count(),
                                 prev_col.offset(),
                                 std::move(children));
        }
        // Traverse upward and include any list columns in the ancestors
        for (detail::linked_column_view* parent = branch.front()->parent; parent;
             parent                             = parent->parent) {
          if (parent->type().id() == type_id::LIST) {
            // Include this parent
            temp_col = column_view(
              parent->type(),
              parent->size(),
              nullptr,  // list has no data of its own
              nullptr,  // If we're going through this then nullmask is already in another branch
              0,
              parent->offset(),
              {*parent->children[lists_column_view::offsets_column_index], temp_col});
          } else if (parent->type().id() == type_id::STRUCT) {
            // Replace offset with parent's offset
            temp_col = column_view(temp_col.type(),
                                   parent->size(),
                                   temp_col.head(),
                                   temp_col.null_mask(),
                                   temp_col.null_count(),
                                   parent->offset(),
                                   {temp_col.child_begin(), temp_col.child_end()});
          }
        }
        verticalized_columns.push_back(temp_col);
      }
      if (not column_order.empty()) {
        new_column_order.insert(new_column_order.end(), flattened.size(), column_order[col_idx]);
      }
      if (not null_precedence.empty()) {
        new_null_precedence.insert(
          new_null_precedence.end(), flattened.size(), null_precedence[col_idx]);
      }
    } else {
      verticalized_columns.push_back(*col);
      verticalized_col_depths.push_back(0);
      if (not column_order.empty()) { new_column_order.push_back(column_order[col_idx]); }
      if (not null_precedence.empty()) { new_null_precedence.push_back(null_precedence[col_idx]); }
    }
  }
  return std::make_tuple(table_view(verticalized_columns),
                         std::move(new_column_order),
                         std::move(new_null_precedence),
                         std::move(verticalized_col_depths));
}

/*
 * This helper function generates dremel data for any list-type columns in a
 * table. This data is necessary for lexicographic comparisons.
 */
auto list_lex_preprocess(table_view const& table, rmm::cuda_stream_view stream)
{
  std::vector<detail::dremel_data> dremel_data;
  std::vector<detail::dremel_device_view> dremel_device_views;
  for (auto const& col : table) {
    if (col.type().id() == type_id::LIST) {
      dremel_data.push_back(detail::get_comparator_data(col, {}, false, stream));
      dremel_device_views.push_back(dremel_data.back());
    }
  }
  auto d_dremel_device_views = detail::make_device_uvector_sync(
    dremel_device_views, stream, rmm::mr::get_current_device_resource());
  return std::make_tuple(std::move(dremel_data), std::move(d_dremel_device_views));
}

using column_checker_fn_t = std::function<void(column_view const&)>;

/**
 * @brief Check a table for compatibility with lexicographic comparison
 *
 * Checks whether a given table contains columns of non-relationally comparable types.
 */
void check_lex_compatibility(table_view const& input)
{
  // Basically check if there's any LIST of STRUCT or STRUCT of LIST hiding anywhere in the table
  column_checker_fn_t check_column = [&](column_view const& c) {
    if (c.type().id() == type_id::LIST) {
      auto const& list_col = lists_column_view(c);
      CUDF_EXPECTS(list_col.child().type().id() != type_id::STRUCT,
                   "Cannot lexicographically compare a table with a LIST of STRUCT column");
      check_column(list_col.child());
    } else if (c.type().id() == type_id::STRUCT) {
      for (auto child = c.child_begin(); child < c.child_end(); ++child) {
        CUDF_EXPECTS(child->type().id() != type_id::LIST,
                     "Cannot lexicographically compare a table with a STRUCT of LIST column");
        check_column(*child);
      }
    }
    if (not is_nested(c.type())) {
      CUDF_EXPECTS(is_relationally_comparable(c.type()),
                   "Cannot lexicographic compare a table with a column of type " +
                     cudf::type_to_name(c.type()));
    }
  };
  for (column_view const& c : input) {
    check_column(c);
  }
}

/**
 * @brief Check a table for compatibility with equality comparison
 *
 * Checks whether a given table contains columns of non-equality comparable types.
 */
void check_eq_compatibility(table_view const& input)
{
  column_checker_fn_t check_column = [&](column_view const& c) {
    if (not is_nested(c.type())) {
      CUDF_EXPECTS(is_equality_comparable(c.type()),
                   "Cannot compare equality for a table with a column of type " +
                     cudf::type_to_name(c.type()));
    }
    for (auto child = c.child_begin(); child < c.child_end(); ++child) {
      check_column(*child);
    }
  };
  for (column_view const& c : input) {
    check_column(c);
  }
}

void check_shape_compatibility(table_view const& lhs, table_view const& rhs)
{
  CUDF_EXPECTS(lhs.num_columns() == rhs.num_columns(),
               "Cannot compare tables with different number of columns");
  for (size_type i = 0; i < lhs.num_columns(); ++i) {
    CUDF_EXPECTS(column_types_equivalent(lhs.column(i), rhs.column(i)),
                 "Cannot compare tables with different column types");
  }
}

}  // namespace

namespace row {

namespace lexicographic {

namespace {

/**
 * @brief Replace child of the input lists column by a new child column.
 *
 * If the input is not sliced, just replace the input child by the new_child.
 * Otherwise, we have to generate new offsets and replace both the offsets and the child of the
 * input by the new ones. This is because the new child was generated by ranking and always
 * has zero offset, so it cannot replace the input child if it is sliced.
 *
 * The new generated offsets column needs to be returned and kept alive.
 *
 * @param[in] input The input column_view of type LIST
 * @param[in] new_child A new child column to replace the existing child of the input
 * @param[out] out_cols An array to store the new generated offsets (if applicable)
 * @param[in] stream CUDA stream used for device memory operations and kernel launches
 * @param[in] mr Device memory resource used to allocate the returned column
 * @return An output column_view with child replaced
 */
auto replace_child(column_view const& input,
                   column_view const& new_child,
                   std::vector<std::unique_ptr<column>>& out_cols,
                   rmm::cuda_stream_view stream,
                   rmm::mr::device_memory_resource* mr)
{
  auto const make_output = [&input](auto const& offsets_cv, auto const& child_cv) {
    return column_view{data_type{type_id::LIST},
                       input.size(),
                       nullptr,
                       input.null_mask(),
                       input.null_count(),
                       0,
                       {offsets_cv, child_cv}};
  };

  if (input.offset() == 0) {
    return make_output(input.child(lists_column_view::offsets_column_index), new_child);
  }

  out_cols.emplace_back(
    cudf::lists::detail::get_normalized_offsets(lists_column_view{input}, stream, mr));
  return make_output(out_cols.back()->view(), new_child);
}

/**
 * @brief Compute ranks of the input column.
 *
 * `Dense` rank type must be used for compute ranking of the input for later lexicographic
 * comparison.
 *
 * To understand why, consider: `input = [ [{0, "a"}, {3, "c"}], [{0, "a"}, {2, "b"}] ]`.
 * If first rank is used, `transformed_input = [ [0, 3], [1, 2] ]`. Comparing them will lead
 * to the result row(0) < row(1) which is incorrect.
 * With dense rank, `transformed_input = [ [0, 2], [0, 1] ]`, producing the correct output for
 * lexicographic comparison.
 *
 * In addition, since the input column being ranked is always a nested child column instead of
 * a top-level column, the column order for ranking should be fixed to the same value
 * `order::ASCENDING` in all situations.
 * For example, with the same input above, using column order as `order::ASCENDING` we will have
 * `transformed_input = [ [0, 2], [0, 1] ]`. The output of sorting `transformed_input` will be
 * exactly the same as sorting `input` regardless of the sorting order (ASC or DESC).
 *
 * @param input The input column to compute ranks
 * @param column_null_order The flag indicating how nulls compare to non-null values
 * @param stream CUDA stream used for device memory operations and kernel launches
 * @param mr Device memory resource used to allocate the returned column
 * @return The output rank columns
 */
auto compute_ranks(column_view const& input,
                   null_order column_null_order,
                   rmm::cuda_stream_view stream,
                   rmm::mr::device_memory_resource* mr)
{
  return cudf::detail::rank(input,
                            rank_method::DENSE,
                            order::ASCENDING,
                            null_policy::EXCLUDE,
                            column_null_order,
                            false /*percentage*/,
                            stream,
                            mr);
}

/**
 * @brief Transform any nested lists-of-structs column into lists-of-integers column.
 *
 * For a lists-of-structs column at any nested level, its child structs column will be replaced by a
 * `size_type` column computed as its ranks.
 *
 * If the input column is not lists-of-structs, or does not contain lists-of-structs at any nested
 * level, the input will be passed through without any changes.
 *
 * @param input The input column to transform
 * @param column_null_order The flag indicating how nulls compare to non-null values
 * @param stream CUDA stream used for device memory operations and kernel launches
 * @param mr Device memory resource used to allocate the returned column(s)
 * @return A pair consisting of new column_view representing the transformed input, along with
 *         an array containing its rank column(s) (of `size_type` type) and possibly new list
 *         offsets generated during the transformation process
 */
std::pair<column_view, std::vector<std::unique_ptr<column>>> transform_lists_of_structs(
  column_view const& input,
  null_order column_null_order,
  rmm::cuda_stream_view stream,
  rmm::mr::device_memory_resource* mr)
{
  std::vector<std::unique_ptr<column>> out_cols;

  if (input.type().id() == type_id::LIST) {
    auto const child = cudf::lists_column_view{input}.get_sliced_child(stream);

    // Found a lists-of-structs column.
    if (child.type().id() == type_id::STRUCT) {
      out_cols.emplace_back(compute_ranks(child, column_null_order, stream, mr));
      return {replace_child(input, out_cols.back()->view(), out_cols, stream, mr),
              std::move(out_cols)};
    }
    // Found a lists-of-lists column.
    else if (child.type().id() == type_id::LIST) {
      // Recursively call transformation on the child column.
      auto [new_child, out_cols_child] =
        transform_lists_of_structs(child, column_null_order, stream, mr);

      // Only transform the current column if its child has been transformed.
      if (out_cols_child.size() > 0) {
        out_cols.insert(out_cols.end(),
                        std::make_move_iterator(out_cols_child.begin()),
                        std::make_move_iterator(out_cols_child.end()));
        return {replace_child(input, new_child, out_cols, stream, mr), std::move(out_cols)};
      }
      // else: child was not transformed so input is also not transformed.
    }
    // else: child is not STRUCT or LIST: no transformation.
  }
  // else: lhs.type().id() != type_id::LIST.
  // In such situations, lhs.type().id() can still be type_id::STRUCT. However, any
  // structs-of-lists should be decomposed into empty struct type `Struct<>` before being
  // processed by this function so we do nothing here.

  // Passthrough: nothing changed.
  return {input, std::move(out_cols)};
}

/**
 * @brief Transform any nested lists-of-structs column into lists-of-integers column.
 *
 * For a lists-of-structs column at any nested level, its child structs column will be replaced by a
 * `size_type` column computed as its ranks. In addition, equivalent child columns of both input
 * columns (i.e., child columns at the same order, same nested level) will be combined and
 * ranked together.
 *
 * If the input columns are not lists-of-structs, or do not contain lists-of-structs at any nested
 * level, there will not be any changes.
 *
 * @param lhs The input lhs column to transform
 * @param rhs The input rhs column to transform
 * @param column_null_order The flag indicating how nulls compare to non-null values
 * @param stream CUDA stream used for device memory operations and kernel launches
 * @param mr Device memory resource used to allocate the returned column(s)
 * @return A tuple consisting of new column_view(s) representing the transformed input, along with
 *         their rank column(s) (of `size_type` type) and possibly new list offsets generated
 *         during the transformation process
 */
std::tuple<column_view,
           column_view,
           std::vector<std::unique_ptr<column>>,
           std::vector<std::unique_ptr<column>>>
transform_lists_of_structs(column_view const& lhs,
                           column_view const& rhs,
                           null_order column_null_order,
                           rmm::cuda_stream_view stream,
                           rmm::mr::device_memory_resource* mr)
{
  std::vector<std::unique_ptr<column>> out_cols_lhs;
  std::vector<std::unique_ptr<column>> out_cols_rhs;

  auto const make_output = [&](auto const& new_child_lhs, auto const& new_child_rhs) {
    return std::tuple{replace_child(lhs, new_child_lhs, out_cols_lhs, stream, mr),
                      replace_child(rhs, new_child_rhs, out_cols_rhs, stream, mr),
                      std::move(out_cols_lhs),
                      std::move(out_cols_rhs)};
  };

  if (lhs.type().id() == type_id::LIST) {
    auto const child_lhs = cudf::lists_column_view{lhs}.get_sliced_child(stream);
    auto const child_rhs = cudf::lists_column_view{rhs}.get_sliced_child(stream);

    // Found a lists-of-structs column.
    if (child_lhs.type().id() == type_id::STRUCT) {
      auto const concatenated_children =
        cudf::detail::concatenate(std::vector<column_view>{child_lhs, child_rhs},
                                  stream,
                                  rmm::mr::get_current_device_resource());

      auto const ranks        = compute_ranks(concatenated_children->view(),
                                       column_null_order,
                                       stream,
                                       rmm::mr::get_current_device_resource());
      auto const ranks_slices = cudf::detail::slice(
        ranks->view(),
        {0, child_lhs.size(), child_lhs.size(), child_lhs.size() + child_rhs.size()},
        stream);

      out_cols_lhs.emplace_back(std::make_unique<column>(ranks_slices.front(), stream, mr));
      out_cols_rhs.emplace_back(std::make_unique<column>(ranks_slices.back(), stream, mr));

      return make_output(out_cols_lhs.back()->view(), out_cols_rhs.back()->view());

    }
    // Found a lists-of-lists column.
    else if (child_lhs.type().id() == type_id::LIST) {
      // Recursively call transformation on the child column.
      auto [new_child_lhs, new_child_rhs, out_cols_child_lhs, out_cols_child_rhs] =
        transform_lists_of_structs(child_lhs, child_rhs, column_null_order, stream, mr);

      // Only transform the current pair of columns if their children have been transformed.
      if (out_cols_child_lhs.size() > 0 || out_cols_child_rhs.size() > 0) {
        out_cols_lhs.insert(out_cols_lhs.end(),
                            std::make_move_iterator(out_cols_child_lhs.begin()),
                            std::make_move_iterator(out_cols_child_lhs.end()));
        out_cols_rhs.insert(out_cols_rhs.end(),
                            std::make_move_iterator(out_cols_child_rhs.begin()),
                            std::make_move_iterator(out_cols_child_rhs.end()));

        return make_output(new_child_lhs, new_child_rhs);
      }
    }
    // else: child is not STRUCT or LIST: just go to the end of this function, no transformation.
  }
  // else: lhs.type().id() != type_id::LIST.
  // In such situations, lhs.type().id() can still be type_id::STRUCT. However, any
  // structs-of-lists should be decomposed into empty struct type `Struct<>` before being
  // processed by this function so we do nothing here.

  // Passthrough: nothing changed.
  return {lhs, rhs, std::move(out_cols_lhs), std::move(out_cols_rhs)};
}

}  // namespace

std::shared_ptr<preprocessed_table> preprocessed_table::create(
  table_view const& preprocessed_input,
  std::vector<int>&& verticalized_col_depths,
  std::vector<std::unique_ptr<column>>&& transformed_columns,
  host_span<order const> column_order,
  host_span<null_order const> null_precedence,
  bool has_ranked_children,
  rmm::cuda_stream_view stream)
{
  check_lex_compatibility(preprocessed_input);

  auto d_table = table_device_view::create(preprocessed_input, stream);
  auto d_column_order =
    detail::make_device_uvector_async(column_order, stream, rmm::mr::get_current_device_resource());
  auto d_null_precedence = detail::make_device_uvector_async(
    null_precedence, stream, rmm::mr::get_current_device_resource());
  auto d_depths = detail::make_device_uvector_async(
    verticalized_col_depths, stream, rmm::mr::get_current_device_resource());

  if (detail::has_nested_columns(preprocessed_input)) {
    auto [dremel_data, d_dremel_device_view] = list_lex_preprocess(preprocessed_input, stream);
    return std::shared_ptr<preprocessed_table>(
      new preprocessed_table(std::move(d_table),
                             std::move(d_column_order),
                             std::move(d_null_precedence),
                             std::move(d_depths),
                             std::move(dremel_data),
                             std::move(d_dremel_device_view),
                             std::move(transformed_columns),
                             has_ranked_children));
  } else {
    return std::shared_ptr<preprocessed_table>(
      new preprocessed_table(std::move(d_table),
                             std::move(d_column_order),
                             std::move(d_null_precedence),
                             std::move(d_depths),
                             std::move(transformed_columns),
                             has_ranked_children));
  }
}

std::shared_ptr<preprocessed_table> preprocessed_table::create(
  table_view const& input,
  host_span<order const> column_order,
  host_span<null_order const> null_precedence,
  rmm::cuda_stream_view stream)
{
  auto [decomposed_input, new_column_order, new_null_precedence, verticalized_col_depths] =
    decompose_structs(input, decompose_lists_column::NO, column_order, null_precedence);

  // Transform any (nested) lists-of-structs column into lists-of-integers column.
  std::vector<std::unique_ptr<column>> transformed_columns;
  auto const transformed_input =
    [&, &decomposed_input = decomposed_input, &new_null_precedence = new_null_precedence] {
      std::vector<column_view> transformed_cvs;

      for (size_type col_idx = 0; col_idx < decomposed_input.num_columns(); ++col_idx) {
        auto const& lhs_col = decomposed_input.column(col_idx);

        auto [transformed, curr_out_cols] = transform_lists_of_structs(
          lhs_col,
          null_precedence.empty() ? null_order::BEFORE : new_null_precedence[col_idx],
          stream,
          rmm::mr::get_current_device_resource());

        transformed_cvs.emplace_back(std::move(transformed));
        transformed_columns.insert(transformed_columns.end(),
                                   std::make_move_iterator(curr_out_cols.begin()),
                                   std::make_move_iterator(curr_out_cols.end()));
      }

      return table_view{transformed_cvs};
    }();

  auto const has_ranked_children = !transformed_columns.empty();
  return create(transformed_input,
                std::move(verticalized_col_depths),
                std::move(transformed_columns),
                new_column_order,
                new_null_precedence,
                has_ranked_children,
                stream);
}

std::pair<std::shared_ptr<preprocessed_table>, std::shared_ptr<preprocessed_table>>
preprocessed_table::create(table_view const& lhs,
                           table_view const& rhs,
                           host_span<order const> column_order,
                           host_span<null_order const> null_precedence,
                           rmm::cuda_stream_view stream)
{
  check_shape_compatibility(lhs, rhs);

  auto [decomposed_lhs,
        new_column_order_lhs,
        new_null_precedence_lhs,
        verticalized_col_depths_lhs] =
    decompose_structs(lhs, decompose_lists_column::NO, column_order, null_precedence);

  // Unused variables are new column order and null order for rhs, which are the same as for lhs
  // so we don't need them.
  [[maybe_unused]] auto [decomposed_rhs, unused0, unused1, verticalized_col_depths_rhs] =
    decompose_structs(rhs, decompose_lists_column::NO, column_order, null_precedence);

  // Transform any (nested) lists-of-structs column into lists-of-integers column.
  std::vector<std::unique_ptr<column>> transformed_columns_lhs;
  std::vector<std::unique_ptr<column>> transformed_columns_rhs;
  auto const [transformed_lhs,
              transformed_rhs] = [&,
                                  &decomposed_lhs          = decomposed_lhs,
                                  &decomposed_rhs          = decomposed_rhs,
                                  &new_null_precedence_lhs = new_null_precedence_lhs] {
    std::vector<column_view> transformed_lhs_cvs;
    std::vector<column_view> transformed_rhs_cvs;

    for (size_type col_idx = 0; col_idx < decomposed_lhs.num_columns(); ++col_idx) {
      auto const& lhs_col = decomposed_lhs.column(col_idx);
      auto const& rhs_col = decomposed_rhs.column(col_idx);

      auto [transformed_lhs, transformed_rhs, curr_out_cols_lhs, curr_out_cols_rhs] =
        transform_lists_of_structs(
          lhs_col,
          rhs_col,
          null_precedence.empty() ? null_order::BEFORE : null_precedence[col_idx],
          stream,
          rmm::mr::get_current_device_resource());

      transformed_lhs_cvs.emplace_back(std::move(transformed_lhs));
      transformed_rhs_cvs.emplace_back(std::move(transformed_rhs));
      transformed_columns_lhs.insert(transformed_columns_lhs.end(),
                                     std::make_move_iterator(curr_out_cols_lhs.begin()),
                                     std::make_move_iterator(curr_out_cols_lhs.end()));
      transformed_columns_rhs.insert(transformed_columns_rhs.end(),
                                     std::make_move_iterator(curr_out_cols_rhs.begin()),
                                     std::make_move_iterator(curr_out_cols_rhs.end()));
    }

    return std::pair{table_view{transformed_lhs_cvs}, table_view{transformed_rhs_cvs}};
  }();

  // This should be the same for both lhs and rhs but not all the time, such as when one table
  // has 0 rows while the other has >0 rows. So we check separately for each of them.
  auto const has_ranked_children_lhs = !transformed_columns_lhs.empty();
  auto const has_ranked_children_rhs = !transformed_columns_rhs.empty();

  return {create(transformed_lhs,
                 std::move(verticalized_col_depths_lhs),
                 std::move(transformed_columns_lhs),
                 new_column_order_lhs,
                 new_null_precedence_lhs,
                 has_ranked_children_lhs,
                 stream),
          create(transformed_rhs,
                 std::move(verticalized_col_depths_rhs),
                 std::move(transformed_columns_rhs),
                 new_column_order_lhs,
                 new_null_precedence_lhs,
                 has_ranked_children_rhs,
                 stream)};
}

preprocessed_table::preprocessed_table(
  table_device_view_owner&& table,
  rmm::device_uvector<order>&& column_order,
  rmm::device_uvector<null_order>&& null_precedence,
  rmm::device_uvector<size_type>&& depths,
  std::vector<detail::dremel_data>&& dremel_data,
  rmm::device_uvector<detail::dremel_device_view>&& dremel_device_views,
  std::vector<std::unique_ptr<column>>&& transformed_columns,
  bool has_ranked_children)
  : _t(std::move(table)),
    _column_order(std::move(column_order)),
    _null_precedence(std::move(null_precedence)),
    _depths(std::move(depths)),
    _dremel_data(std::move(dremel_data)),
    _dremel_device_views(std::move(dremel_device_views)),
    _transformed_columns(std::move(transformed_columns)),
    _has_ranked_children(has_ranked_children)
{
}

preprocessed_table::preprocessed_table(table_device_view_owner&& table,
                                       rmm::device_uvector<order>&& column_order,
                                       rmm::device_uvector<null_order>&& null_precedence,
                                       rmm::device_uvector<size_type>&& depths,
                                       std::vector<std::unique_ptr<column>>&& transformed_columns,
                                       bool has_ranked_children)
  : _t(std::move(table)),
    _column_order(std::move(column_order)),
    _null_precedence(std::move(null_precedence)),
    _depths(std::move(depths)),
    _dremel_data{},
    _dremel_device_views{},
    _transformed_columns(std::move(transformed_columns)),
    _has_ranked_children(has_ranked_children)
{
}

two_table_comparator::two_table_comparator(table_view const& left,
                                           table_view const& right,
                                           host_span<order const> column_order,
                                           host_span<null_order const> null_precedence,
                                           rmm::cuda_stream_view stream)
{
  std::tie(d_left_table, d_right_table) =
    preprocessed_table::create(left, right, column_order, null_precedence, stream);
}

}  // namespace lexicographic

namespace equality {

std::shared_ptr<preprocessed_table> preprocessed_table::create(table_view const& t,
                                                               rmm::cuda_stream_view stream)
{
  check_eq_compatibility(t);

  auto [null_pushed_table, nullable_data] =
    structs::detail::push_down_nulls(t, stream, rmm::mr::get_current_device_resource());
  auto struct_offset_removed_table = remove_struct_child_offsets(null_pushed_table);
  auto verticalized_t =
    std::get<0>(decompose_structs(struct_offset_removed_table, decompose_lists_column::YES));

  auto d_t = table_device_view_owner(table_device_view::create(verticalized_t, stream));
  return std::shared_ptr<preprocessed_table>(new preprocessed_table(
    std::move(d_t), std::move(nullable_data.new_null_masks), std::move(nullable_data.new_columns)));
}

two_table_comparator::two_table_comparator(table_view const& left,
                                           table_view const& right,
                                           rmm::cuda_stream_view stream)
  : d_left_table{preprocessed_table::create(left, stream)},
    d_right_table{preprocessed_table::create(right, stream)}
{
  check_shape_compatibility(left, right);
}

}  // namespace equality

}  // namespace row
}  // namespace experimental
}  // namespace cudf
